package www.larkmidtable.com;

import com.alibaba.fastjson.JSONObject;
import com.ibm.db2.jcc.am.BatchUpdateException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import www.larkmidtable.com.channel.Channel;
import www.larkmidtable.com.util.DBType;
import www.larkmidtable.com.writer.Writer;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;

public class DB2Writer extends Writer {
    private final Queue<Connection> connectionQueue = new LinkedBlockingQueue<>();
    private final Queue<PreparedStatement> statementQueue = new LinkedBlockingQueue<>();
    private static final Logger logger = LoggerFactory.getLogger(DB2Writer.class);

    @Override
    public void open() {
        try {
            logger.info("DB2的Writer建立连接开始....");
            Class.forName(DBType.DB2.getDriverClass());
            Connection connection = DriverManager.getConnection(configBean.getUrl(), configBean.getUsername(), configBean.getPassword());
            connection.setAutoCommit(false);
            connectionQueue.add(connection);
            logger.info("DB2的Writer建立连接结束....");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void startWrite() {
        logger.info("开始写数据....");
        long startTime = System.currentTimeMillis();
        LinkedBlockingQueue<List<String>> cQueue = (LinkedBlockingQueue<List<String>>) Channel.getQueue();
        List<String> rList = cQueue.poll();
        if (rList != null && rList.size() > 0) {
            String[] columns = configBean.getColumn().split(",");
            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < columns.length; i++) {
                sb.append("?,");
            }
            String whStr = sb.substring(0, sb.toString().length() - 1);
            String sql = String.format("insert into %s(%s) values (%s)", configBean.getTable(), configBean.getColumn(), whStr);
            try (Connection connection = connectionQueue.peek();
                 PreparedStatement statement = connection.prepareStatement(sql)) {
                statementQueue.add(statement);
                for (int i = 0; i < rList.size(); i++) {
                    JSONObject jsonObject = JSONObject.parseObject(rList.get(i));

                    for (int j = 1; j <= columns.length; j++) {
                        String colName = columns[j - 1].toUpperCase().trim();
                        statement.setObject(j, jsonObject.get(colName));
                    }
                    statement.addBatch();
                    if (i % 10000 == 0 && i > 0) {
                        statement.executeBatch();
                        connection.commit();
                        statement.clearBatch();
                    }
                }
                statement.executeBatch();
                connection.commit();
                statement.clearBatch();
            } catch (BatchUpdateException e){
                e.getNextException().printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        long endTime = System.currentTimeMillis();
        logger.info("写数据完成....耗时：" + (endTime - startTime) + "ms");
    }

    @Override
    public void close() {
        try {
            logger.info("DB2的Writer开始进行关闭连接开始....");
            while (statementQueue.size() != 0) {
                PreparedStatement preparedStatement = statementQueue.poll();
                if (Objects.nonNull(preparedStatement)) {
                    preparedStatement.close();
                }
            }
            while (connectionQueue.size() != 0) {
                Connection connection = connectionQueue.poll();
                if (Objects.nonNull(connection)) {
                    connection.close();
                }
            }
            logger.info("DB2的Writer开始进行关闭连接结束....");
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}
